package com.ideaaedi.commonds.sort;


import java.util.concurrent.CountedCompleter;

/**
 * Fork/Join之CountedCompleter实现 多线程归并排序
 *
 * P.S. 好吧，我写的归并算法的实现， 没有把归并算法的最佳性能发挥出来。。。。。。
 *      简单测试发现: 当 数据量处于(0, 1万]时， Collections.sort性能优于MergeSortCompleter
 *                  当 数据量处于(1万, 100万]时， MergeSortCompleter性能优于Collections.sort
 *                  当 数据量处于(100万, 2000万]时， Collections.sort性能优于MergeSortCompleter
 *                  。。。
 *
 * @author <font size = "20" color = "#3CAA3C"><a href="https://gitee.com/JustryDeng">JustryDeng</a></font> <img src="https://gitee.com/JustryDeng/shared-files/raw/master/JustryDeng/avatar.jpg" />
 * @since 1.0.0
 */
@SuppressWarnings("unused")
public class MergeSortCompleter<T extends Comparable<T>> extends CountedCompleter<Void> {
    
    private final Comparable<T>[] data;
    
    private final int startIndex;
    private int middleIndex;
    private final int endIndex;
    
    private final boolean asc;
    
    /**
     * 进行fork的数组长度阈值
     */
    private final int FORK_THRESHOLD;
    
    /**
     * 默认的进行fork的数组长度阈值
     */
    private static final int DEFAULT_FORK_THRESHOLD = 200;
    
    /**
     * @see this#MergeSortCompleter(MergeSortCompleter, Comparable[], int, int, int, boolean)
     */
    public MergeSortCompleter(MergeSortCompleter parent, Comparable<T>[] data, int startIndex, int endIndex) {
        this(parent, data, startIndex, endIndex, DEFAULT_FORK_THRESHOLD, true);
    }
    
    /**
     * @see this#MergeSortCompleter(MergeSortCompleter, Comparable[], int, int, int, boolean)
     */
    public MergeSortCompleter(MergeSortCompleter parent, Comparable<T>[] data, int startIndex, int endIndex, boolean asc) {
        this(parent, data, startIndex, endIndex, DEFAULT_FORK_THRESHOLD, asc);
    }
    
    /**
     * 构造器
     *
     * @param parent
     *            父任务
     * @param data
     *            数据容器
     * @param startIndex
     *            要被排序的数据的起始索引
     * @param endIndex
     *            要被排序的数据的结尾引
     * @param forkThreshold
     *            进行fork的数组长度阈值
     * @param asc
     *            true-升序; false-降序
     */
    public MergeSortCompleter(MergeSortCompleter parent, Comparable<T>[] data,
                              int startIndex, int endIndex, int forkThreshold, boolean asc) {
        super(parent);
        this.data = data;
        this.startIndex = startIndex;
        this.endIndex = endIndex;
        this.asc = asc;
        FORK_THRESHOLD = forkThreshold;
    }
    
    @Override
    public void compute() {
        // 如果长度>=指定的阈值， 那么fork
        if (endIndex - startIndex >= FORK_THRESHOLD - 1) {
            middleIndex = startIndex + ((endIndex - startIndex) >> 1);
            MergeSortCompleter<T> task1 = new MergeSortCompleter<>(this, data, startIndex, middleIndex, asc);
            MergeSortCompleter<T> task2 = new MergeSortCompleter<>(this, data, middleIndex + 1, endIndex, asc);
            // 对pending进行add操作，必须在fork之前
            this.addToPendingCount(1);
            task1.fork();
            task2.fork();
        // 任务粒度已经足够小了， 不再fork， 直接进行逻辑处理
        } else {
            // 执行排序
            doSort(data, startIndex, endIndex, asc);
            // 主要逻辑处理完后，调用tryComplete, 使执行onCompletion如果需要的话
            tryComplete();
        }
    }
    
    /**
     * 触发onCompletion逻辑
     *
     * @param caller
     *         触发调用onCompletion方法的对象
     */
    @Override
    public void onCompletion(CountedCompleter<?> caller) {
        // middle == 0 说明没有fork过
        if (middleIndex == 0) {
            return;
        }
        merge(data, startIndex, middleIndex, endIndex, asc);
    }
    
    /// ********************************************** 下面的是归并排序实现
    
    /**
     * 归并排序
     *
     * @param data
     *            数据容器
     * @param start
     *            要被排序的数据的起始索引
     * @param end
     *            要被排序的数据的结尾引
     * @param asc
     *            true-升序; false-降序
     */
    public void doSort(Comparable<T>[] data, int start, int end, boolean asc) {
        if (end - start < 2) {
            return;
        }
        int middle = start + ((end - start) >> 1);
        splitAndMerge(data, start, middle, asc);
        splitAndMerge(data, middle + 1, end, asc);
        merge(data, start, middle, end, asc);
    }
    
    /**
     * (两路)拆分、归并 数组
     *
     * @param originArray
     *         数组
     * @param left
     *         数组的起始元素索引
     * @param right
     *         数组的结尾元素索引
     * @param asc
     *         升序/降序。 true-升序; false-降序
     */
    public void splitAndMerge(Comparable<T>[] originArray, int left, int right, boolean asc) {
        // 中间那个数的索引
        int middle = left + ((right - left) >> 1);
        /*
         * 当目标区域要只有一个元素时，不再进行拆分
         *
         * 已知originArray长度大于0, 这里简单数学证明: 当middle = right时，originArray长度为1
         * ∵ middle = (left + right) / 2 且 middle = right
         * ∴ right = (left + right) / 2
         * ∴ 2 * right = left + right
         * ∴ right = left
         * ∴ right = left
         * ∴ originArray长度为1
         */
        if (middle == right) {
            return;
        }
        // 二叉树【前序遍历】, 再次进行拆分
        splitAndMerge(originArray, left, middle, asc);
        splitAndMerge(originArray, middle + 1, right, asc);
        // 合并
        merge(originArray, left, middle, right, asc);
    }
    
    /**
     * 归并两个有序的数组
     *
     * @param originArray
     *         数组。 注:该数组由两个紧邻的 有序数组组成
     * @param left
     *         要归并的第一个数组的起始元素索引
     * @param middle
     *         要归并的第一个数组的结尾元素索引
     * @param right
     *         要归并的第二个数组的结尾元素索引 注:要合并的第二个数组的结尾元素索引为middle + 1
     * @param asc
     *         升序/降序。 true-升序; false-降序
     */
    @SuppressWarnings("unchecked")
    private void merge(Comparable<T>[] originArray, int left, int middle, int right, boolean asc) {
        Comparable<T>[] tmpArray = new Comparable[right - left + 1];
        int i = left, j = middle + 1, tmpIndex = 0;
        int result;
        // 循环比较， 直至其中一个数组所有元素 拷贝至 tmpArray
        while (i <= middle && j <= right) {
            result = originArray[i].compareTo((T) originArray[j]);
            // 控制升序降序
            boolean ascFlag = asc ? result <= 0 : result >= 0;
            if (ascFlag) {
                tmpArray[tmpIndex] = originArray[i];
                i++;
            } else {
                tmpArray[tmpIndex] = originArray[j];
                j++;
            }
            tmpIndex++;
        }
        // 将剩余那个没拷贝完的数组中剩余的元素 拷贝至 tmpArray
        while (i <= middle) {
            tmpArray[tmpIndex] = originArray[i];
            i++;
            tmpIndex++;
        }
        while (j <= right) {
            tmpArray[tmpIndex] = originArray[j];
            j++;
            tmpIndex++;
        }
        // 将临时数组中的元素按顺序拷贝至originArray
        System.arraycopy(tmpArray, 0, originArray, left, tmpArray.length);
    }
    
}